Skip to content

feat(mqtt): split signing and publish workers#196

Open
linkdata wants to merge 1 commit into
dnstapir:mainfrom
linkdata:feat/mqtt-signing-pipeline
Open

feat(mqtt): split signing and publish workers#196
linkdata wants to merge 1 commit into
dnstapir:mainfrom
linkdata:feat/mqtt-signing-pipeline

Conversation

@linkdata
Copy link
Copy Markdown

Summary

  • split MQTT work into parallel JWS signing workers and one serial paho publisher
  • make signing and publishing workers respect autopaho context cancellation
  • add tests for signing, bad-key recovery, serial publishing, and cancellation

Tests

  • go test ./pkg/runner ./...

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 30, 2026

Warning

Rate limit exceeded

@linkdata has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 59 minutes and 19 seconds before requesting another review.

To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 08c7a17c-60bc-4189-9385-523344d2fc51

📥 Commits

Reviewing files that changed from the base of the PR and between b615285 and bcbe26b.

📒 Files selected for processing (3)
  • pkg/runner/mqtt.go
  • pkg/runner/mqtt_test.go
  • pkg/runner/runner.go
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
Review rate limit: 0/1 reviews remaining, refill in 59 minutes and 19 seconds.

Comment @coderabbitai help to get the list of available commands and usage tips.

@linkdata
Copy link
Copy Markdown
Author

Additional context from comment files

These notes were split from the local markdown comment files and attached here because they describe this PR's change.

Change 6: JWS sign worker pool

File: pkg/runner/mqtt.go and the runAutoPaho call site in
pkg/runner/runner.go.

Why this was a bottleneck

runAutoPaho was a single goroutine doing
jws.Sign → cm.Publish for every novel-qname event. CPU profiles
showed jws.Sign at ~14 % of total CPU on the 110 K-qps
configuration — all on one core. So even though the box had 19
other cores idle, the MQTT publish path was capped at the rate of
one core's signing work. At 40 K novel qnames/sec that's roughly
40 K Ed25519 signatures/sec — sustainable but exactly at the
boundary, with no headroom for jitter.

What we changed

runAutoPaho is split into three pieces and renamed:

  • mqttSignWorker(wg, jwk) — N goroutines, each reads from
    mqttPubCh, signs, pushes to mqttSignedCh. Pure CPU, runs on
    any core.
  • mqttPublishWorker(cm, topic, usingFileQueue) — one goroutine,
    reads signed bytes from mqttSignedCh, hands them to paho. Stays
    single-goroutine because paho's ConnectionManager expects one
    publisher (it has its own internal serialisation, but spawning N
    goroutines feeding it adds nothing).
  • startMQTTPipeline(...) — wires up the channels, spawns N sign
    workers + 1 publish worker, and arranges for the publish worker to
    see EOF on the signed channel when all sign workers have exited.

A new top-level config field, mqtt-sign-workers, defaults to
GOMAXPROCS(0) when 0 or unset. The runner.go call site calls
startMQTTPipeline instead of runAutoPaho. runAutoPaho no
longer exists.

mqttPubCh and mqttSignedCh are both buffered at 1024 — the same
philosophy as the Tier-1 input-channel resize: enough to absorb
scheduler jitter and one slow operation, not so much that we hide
real backpressure.

Tradeoff

  • Order of MQTT publishes is no longer guaranteed. With N sign
    workers reading from one channel, two messages enqueued in order A,
    B might be signed B, A and reach the publisher in that swapped
    order. For the new-qname stream this is fine — each event is
    independent and the consumer side does not assume MQTT message
    order — but if a future use case ever needed strict ordering,
    this would have to change (or the MQTTSignWorkers would have to
    be set to 1).
  • Slightly more memory in flight. Up to signWorkers + 1024 + 1024 messages queued during a publish stall vs. 1 + 100
    before. At ~3 KiB per signed message that's still under 10 MiB
    worst case.
  • One more goroutine pair on shutdown. The shutdown sequence
    now closes mqttPubCh, sign workers drain and exit, the last one
    closes mqttSignedCh, the publish worker drains and exits. The
    existing autopahoWg covers all of this.

Expected payoff

JWS signing parallelises across GOMAXPROCS cores instead of running
on one. The 14 % single-core CPU pin disappears from the profile;
signing throughput scales close to linearly with worker count up to
the point where the broker's accept rate (or the network) becomes
the cap. On the load-gen smoke this removes MQTT publishing as the
governor on EDM throughput — the test box now hits the next
bottleneck (network, parquet writers) rather than this one.

MQTT Publish Worker Context Cancellation

  • Bug: In mqttPublishWorker(), the channel read from mqttSignedCh at line 198 was a plain blocking receive with no select on autopahoCtx.Done(). When the context was cancelled while the channel was empty, the goroutine blocked indefinitely and could not exit.
  • Impact: Context cancellation alone could not stop the publish worker; shutdown depended entirely on the channel being closed, which only happens after sign workers finish. This delayed shutdown by requiring the channel closure path instead of honoring immediate cancellation.
  • Fix: Replaced the plain blocking read with a select that listens to both mqttSignedCh and autopahoCtx.Done().
  • Reasoning: All goroutines participating in graceful shutdown must be interruptible via context cancellation.
  • Tests: Added TestMqttPublishWorkerExitsOnContextCancel which starts the worker with an empty channel and verifies it exits promptly (within 2 seconds) when the context is cancelled.

Serial MQTT Publishing

  • Bug: The MQTT publish worker was documented as the single paho writer but spawned a new goroutine for every non-filequeue publish.
  • Impact: Slow publishes could overlap, creating concurrent calls into the connection manager and letting the worker appear drained while publish goroutines were still running.
  • Fix: The worker now calls Publish directly in the single publisher goroutine; signing remains parallel upstream.
  • Reasoning: Back-pressure belongs at the publish boundary, and the lifecycle wait group should represent all publish work accepted by the worker.
  • Tests: Added a fake blocking MQTT connection manager that fails if a second publish starts before the first publish is released.

Side effect: paho file queue exposure

Tier 2 lifts the JWS-sign rate from ~40 K/s (sequential, single goroutine)
to whatever GOMAXPROCS allows (~10×). The next bottleneck downstream
turned out to be EDM's default-on MQTT file queue, configured at
runner.go:684 to <DataDir>/mqtt/queue. Paho's
autopaho/queue/file.(*Queue).Peek calls oldestEntry() on every
dequeue, which does:

entries := os.ReadDir(dir)        // O(N)
for _, e := range entries {
    info := os.Lstat(...)         // O(N) syscalls + allocations
    // pick oldest
}

That is O(N) per dequeue, O(N²) per N enqueues. As the queue fills, the
manager's drain rate falls below the enqueue rate, the queue grows, GC
pressure climbs, and end-to-end throughput collapses. Symptom from the
outside: rate runs steady for some seconds, then plummets to near zero
as the queue crosses ~10 K files. We confirmed this on a stress run by
heap-profiling EDM under sustained 100 K qps:

55% alloc_space   autopaho.managePublishQueue → file.Queue.Peek
                  → oldestEntry → os.ReadDir + os.Lstat

…and observing the queue dir grow to 88 903 files / 350 MB at the
moment of the stall. Goroutine count stayed flat at ~96 — this is not
a goroutine leak; it is allocation pressure compounding into GC stalls.

Why this isn't a Tier 2 regression in the strict sense

Pre-Tier-2, JWS signing capped the rate at ~40 K/s, well below the
queue's collapse point. The queue's O(N²) behaviour was always there;
Tier 2 just removed the upstream throttle that was hiding it. A
deployment that has its broker available and processes load at any
non-trivial sustained rate will eventually hit this regardless of
Tier 2 — Tier 2 only made the time-to-failure noticeable in a 100 K-qps
load test.

What we did

For dev/test setups (the load-gen smoke), the file queue is unnecessary:
the broker is always up, QoS 0 publishes are best-effort anyway, and
the durability the queue provides has no value. The fix is the existing
--disable-mqtt-filequeue flag (already in EDM's CLI surface). The
companion repo's dev.sh now passes it.

Without the file queue, paho buffers messages in-memory only, with its
own bounded internal queue applying real backpressure. In our smoke at
100 K qps with all Tier 1 + Tier 2 changes:

  • edm_new_qname_discarded_total: 0 (vs ~94 % discards before)
  • queue dir: empty
  • end-to-end throughput: stable ~67 K qps for the 3-minute run

What we did not do

We did not change the file queue's default. The queue is meant to
provide durability across broker disconnects — disabling it weakens
that guarantee, and operators in real deployments should make that
choice consciously. The right long-term fix is paho-side: replace the
oldestEntry() directory scan with an in-memory ordered index of the
queue files (the same maintenance pass paho already does for cleanup
could maintain it). That is upstream work, not in scope here.

If you do run a deployment with the file queue enabled at high publish
rates, monitor <DataDir>/mqtt/queue size and pre-emptively trim or
disable when the broker can't keep up. Adding a queue-size Prometheus
metric would also be worth doing — open issue.

@linkdata linkdata marked this pull request as ready for review April 30, 2026 12:12
@linkdata linkdata requested a review from a team as a code owner April 30, 2026 12:12
@jschlyter jschlyter added the ai AI was used to write contributed code label Apr 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ai AI was used to write contributed code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants